-
Notifications
You must be signed in to change notification settings - Fork 163
[FEATURE] Improve EncryptorImpl with Asynchronous Handling for Scalability #3919
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[FEATURE] Improve EncryptorImpl with Asynchronous Handling for Scalability #3919
Conversation
…ility Removed the usage of ContDownLatch. Every requets will be submitted and returns the Future. Added a list to track the ongoing master key generation. If any tenant id is in the list, then it's key generation is on going and it will wait until other thread completes the key genearion. Same time system will accept other requests, if key is already avaialble in the map that will procced otherwise key generation for new tenant will start in different thread. So, multiple tenants key generation can happen simulatneuosly. Resolves opensearch-project#3510 Signed-off-by: Abdul Muneer Kolarkunnu <[email protected]>
Awesome! Thanks for raising the PR. This will be a great improvement. I'll start actively reviewing this PR from tomorrow. Can you also please update your PR in details like how did you test for single tenancy and also for multi tenancy? |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3919 +/- ##
=========================================
Coverage 80.39% 80.40%
- Complexity 7910 7915 +5
=========================================
Files 693 693
Lines 34849 34863 +14
Branches 3872 3877 +5
=========================================
+ Hits 28018 28030 +12
Misses 5096 5096
- Partials 1735 1737 +2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
if (decrypted.get(key) != null) { | ||
decryptedCredential.put(key, decrypted.get(key).get()); | ||
} else { | ||
decryptedCredential.put(key, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when the key is null, and you put value of null, what is the intention here? why don't you skip when the key is null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 why we are assigning null here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 why not use putIfAbsent()
?
decryptedCredential.put(key, null); | ||
} | ||
} catch (InterruptedException | ExecutionException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add meaningful exception message to indicate what goes wrong here. something like, failed to process fill Credentials.
final CryptoResult<byte[], JceMasterKey> encryptResult = crypto | ||
.encryptData(jceMasterKey, plainText.getBytes(StandardCharsets.UTF_8)); | ||
return Base64.getEncoder().encodeToString(encryptResult.getResult()); | ||
public Future<String> encrypt(String plainText, String tenantId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use try catch for better error handling?
return CompletableFuture.supplyAsync(() -> {
try {
if (plainText == null || tenantId == null) {
throw new IllegalArgumentException("plainText and tenantId cannot be null");
}
initMasterKey(tenantId);
final AwsCrypto crypto = createAwsCrypto();
JceMasterKey jceMasterKey = createJceMasterKey(tenantId);
final CryptoResult<byte[], JceMasterKey> encryptResult = crypto
.encryptData(jceMasterKey, plainText.getBytes(StandardCharsets.UTF_8));
return Base64.getEncoder().encodeToString(encryptResult.getResult());
} catch (Exception e) {
throw new CompletionException("Encryption failed", e);
}
this.decryptedHeaders = createDecryptedHeaders(headers); | ||
} | ||
|
||
private void fillCredential(Map<String, Future<String>> decrypted, Map<String, String> decryptedCredential) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Input validation?
if (decrypted == null || decryptedCredential == null) {
throw new IllegalArgumentException("Input maps cannot be null");
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better yet since we're creating a new map for decryptedCredentials
we don't care what it is if we immediately overwrite it here.
There are some issues with these changes, Mainly in the initializeNewMasterKey(). I am looking to those. |
@@ -79,9 +80,9 @@ public interface Connector extends ToXContentObject, Writeable { | |||
|
|||
<T> T createPayload(String action, Map<String, String> parameters); | |||
|
|||
void decrypt(String action, BiFunction<String, String, String> function, String tenantId); | |||
void decrypt(String action, BiFunction<String, String, Future<String>> function, String tenantId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider an ActionFuture
here. It has better handling of OpenSearch-specific thread pools, exceptions, and task APIs.
decryptedCredential = new HashMap<>(); | ||
for (String key : credential.keySet()) { | ||
decrypted.put(key, function.apply(credential.get(key), tenantId)); | ||
decryptingTempCredential.put(key, function.apply(credential.get(key), tenantId)); | ||
} | ||
this.decryptedCredential = decrypted; | ||
fillCredential(decryptingTempCredential, decryptedCredential); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This section of code is very confusing. It took me a good 10 minutes to figure out that we're updating a superclass field. Suggestions to improve readability:
- keep
this.
or maybe evensuper.
prefix to make it clear this isn't a local variable - it makes llittle sense to create an empty map (in a superclass field) and do nothing with it but pass it as an argument. If you just called
fillCredential(tempMap, this.(en|de)cryptedCredential)
and created the empty map as the first step in that method it'd be more readable.
Similar comments apply to both en/de and Http/Mcp connectors.
if (decrypted.get(key) != null) { | ||
decryptedCredential.put(key, decrypted.get(key).get()); | ||
} else { | ||
decryptedCredential.put(key, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 why not use putIfAbsent()
?
decryptedCredential.put(key, null); | ||
} | ||
} catch (InterruptedException | ExecutionException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you switch from Future
to ActionFuture
some of this exception handling is already included.
this.decryptedHeaders = createDecryptedHeaders(headers); | ||
} | ||
|
||
private void fillCredential(Map<String, Future<String>> decrypted, Map<String, String> decryptedCredential) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better yet since we're creating a new map for decryptedCredentials
we don't care what it is if we immediately overwrite it here.
encryptFunction = (s, v) -> CompletableFuture.supplyAsync(() -> "encrypted: " + s.toLowerCase(Locale.ROOT)); | ||
decryptFunction = (s, v) -> CompletableFuture.supplyAsync(() -> "decrypted: " + s.toUpperCase(Locale.ROOT)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're not including an Executor
argument here which means the async part will execute on the ForkJoinPool.commonPool()
. This can't be shut down and in theory could trigger spurious thread leak detection.
You should be using a thread pool here. Example.
Similar comment in HttpConnectorTest and AbstractConnectorTest.
return CompletableFuture.supplyAsync(() -> { | ||
initMasterKey(tenantId); | ||
final AwsCrypto crypto = AwsCrypto.builder().withCommitmentPolicy(CommitmentPolicy.RequireEncryptRequireDecrypt).build(); | ||
JceMasterKey jceMasterKey = createJceMasterKey(tenantId); | ||
|
||
final CryptoResult<byte[], JceMasterKey> encryptResult = crypto | ||
.encryptData(jceMasterKey, plainText.getBytes(StandardCharsets.UTF_8)); | ||
return Base64.getEncoder().encodeToString(encryptResult.getResult()); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is where using ForkJoinPool.commonPool()
is actually bad and you really do need a thread pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @akolarkunnu, first I want to thank you for your contribution! I've experienced flaky test behavior from this particular class (see #2888) and this may help with that.
That said, I've made multiple implementations on OpenSearch plugins and our Remote Metadata SDK using futures and learned some hard lessons along the way. I did leave a line by line review above but wanted to follow up with some general comments.
- TLDR on using Futures... avoid them if you can implement something with an
ActionListener
. These are longstanding well-established and tested callback mechanisms that handle most asynchronous work in OpenSearch. When you have a single "thread" with async breaks they are almost always the correct way to handle it. When you are awaiting multiple things to happen, that's where it gets complex. That may be the case here. - OpenSearch has an
ActionFuture
class that doubles as anActionListener
. It has anactionGet()
method that implements some exception handling (unwrapping the nested exceptions, etc.) that is pretty useful. Please take a look at using that. - Generally speaking I see the code replacing an entire map with an entirely new map every time an encrypt/decrypt call is made. It's hard for me to think this approach is thread safe without synchronization of the map. It seems to me we can probably easily add to a map without touching the other keys but the existing "replace the map" implementation raises a lot of questions.
- Consider an approach using an
AtomicReference
to the decryptedCredential map, taking advantage of theupdateAndGet()
method. (I'm not sure that will work here, but it seems a possible improvement over iterate-all-and-replace-without-atomicity.
I couldn't agree more on this point. |
Description
Related Issues
Resolves #3510
Check List
--signoff
.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.